import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;

public class CoFlatMap
{

    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource01 = env.fromElements("aa kk", "bb", "aa", "dd", "dd");
        DataStreamSource<Integer> streamSource02 = env.fromElements(11,22,33,22,11);
        ConnectedStreams<String, Integer> connectedStreams = streamSource01.connect(streamSource02);


//        connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>());

        SingleOutputStreamOperator<String> result = connectedStreams.flatMap(new CoFlatMapFunction<String, Integer, String>()
        {
            @Override
            public void flatMap2(Integer value, Collector<String> out) throws Exception
            {
                out.collect(value.toString());

            }


            @Override
            public void flatMap1(String value, Collector<String> out) throws Exception
            {
                for (String word: value.split(" "))
                {
                    out.collect(word);
                }

            }


        });

        result.print();


        env.execute();
    }

}







//代码改写自:
//https://www.mdeditor.tw/pl/2MlL